查看原文
其他

C# 使用 RabbitMq 队列

DotNet 2021-09-23

(给DotNet加星标,提升.Net技能

转自:做自己-jason
cnblogs.com/Fengge518/p/13826983.html

一、RabbitMQ是个啥?


RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。


RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库


二、使用RabbitMQ有啥好处?


RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。


AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。


AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。


RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。


对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。


RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式。


三、RabbitMq的安装以及环境搭建等


网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!


3.1、运行容器的命令如下:


docker run -d --hostname Log --restart=always
--name rabbitmq -p 5672:5672 -p 15672:15672
-e RABBITMQ_DEFAULT_USER=log_user
-e RABBITMQ_DEFAULT_PASS=331QQFEG123
rabbitmq:3-management


四、RabbitMq的使用场景主要有哪些,啥时候用或者不用?


4.1、什么时候使用MQ?


1)数据驱动的任务依赖


2)上游不关心多下游执行结果


3)异步返回执行时间长


4.2、什么时候不使用MQ?


需要实时关注执行结果 (eg:同步调用)


五、具体C#怎么使用RabbitMq ?


下面直接上code和测试截图了(Demo环境是.NET Core 3.1控制台+Docker上的RabbitMQ容器来进行的)


六、sample模式


就是简单地队列模式,一进一出的效果差不多,测试截图:




Code:


//简单生产端 ui调用者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
//就是简单的队列,生产者Console.WriteLine("====RabbitMqPublishDemo====");
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
}
Console.WriteLine("生成完毕!");
Console.ReadLine();
}
}
}

/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{
using (IConnection conn = connectionFactory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var msgBody = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
}
}
}

//简单消费端
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("====RabbitMqConsumerDemo====");
ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
{
Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
});
Console.ReadLine();
}
}
}


#region 简单生产者后端逻辑
/// <summary>
/// 简单消费者
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="isBasicNack">失败后是否自动放到队列</param>
/// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
{

Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
IConnection conn = connectionFactory.CreateConnection();
IModel channel = conn.CreateModel();
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, ea) =>
{
byte[] bymsg = ea.Body.ToArray();
string msg = Encoding.UTF8.GetString(bymsg);
if (handleMsgStr != null)
{
handleMsgStr.Invoke(msg);
}
else
{
Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
}
};
channel.BasicConsume(queueName, autoAck: true, consumer);
}
#endregion


七、Work模式




//就如下的code, 多次生产,3个消费者都可以自动开始消费
//生产者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
for (int i = 0; i < 500; i++)
{ ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");
}
Console.WriteLine("工作队列模式 生成完毕......!");
Console.ReadLine();
}
}
}
//生产者后端逻辑
public static void PublishWorkQueueModel(string queueName, string msg)
{
using (var connection = connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var body = Encoding.UTF8.GetBytes(msg);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
}
}
//work消费端
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("====Work模式开启了====");
ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
{
Console.WriteLine($"work模式获取到消息{msg}");
});
Console.ReadLine();
}
}
}

//work后端逻辑
public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (handserMsg != null)
{
if (!string.IsNullOrEmpty(message))
{
handserMsg.Invoke(message);
}
}
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}


八、Fanout



Code:

//同一个消息会被多个订阅者消
//发布者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
#region 发布订阅模式,带上了exchange
for (int i = 0; i < 500; i++)
{
ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");
}
Console.WriteLine("发布ok!");
#endregion
Console.ReadLine();
}
}
}
//发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout
public static void PublishExchangeModel(string exchangeName, string message)
{
using (var connection = connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
Console.WriteLine($" Sent {message}");
}
}
//订阅者
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
#region 发布订阅模式 Exchange
ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
{
Console.WriteLine($"订阅到消息:{msg}");
});
#endregion
Console.ReadLine();
}
}
}
//订阅者后端的逻辑
public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: "");
Console.WriteLine(" Waiting for msg....");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
if (handlerMsg != null)
{
if (!string.IsNullOrEmpty(message))
{
handlerMsg.Invoke(message);
}
}
else
{
Console.WriteLine($"订阅到消息:{message}");
}
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}

九、Direct





Code:


//发布者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
static void Main(string[] args)
{
#region 发布订阅 交换机路由模式 Direct
string routerKeyValue = args[0].Split("=")[1];//如 abc.exe --name='qq'
Console.WriteLine("开始发布中。。。");
for (int i = 0; i < 20; i++)
{
string msg = $"小明有{i}只宝剑";
ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg, routerKey: routerKeyValue);
//下面的为固定的写法
//ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg); //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish($"你好我好大家好{i}", routerKey:"onlylog");
}
Console.WriteLine("这次发布完毕。。。");
#endregion
Console.ReadLine();
}
}
}
//发布者后端逻辑 发布订阅的路由模式 Direct
/// <summary>
/// 发布 Direct 路由模式 Direct
/// </summary>
/// <param name="message"></param>
/// <param name="exchangeName"></param>
/// <param name="routerKey"></param>
public static void ExchangeRoutersByDirectModelPublish(string message, string exchangeName = "qqai", string routerKey = "insertToStudent")
{
using (IConnection connection = connectionFactory.CreateConnection())
{
using (IModel channelmodel = connection.CreateModel())
{
channelmodel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
byte[] bymsg = Encoding.UTF8.GetBytes(message);
channelmodel.BasicPublish(exchange: exchangeName, routingKey: routerKey, body: bymsg);
// byte[] bytemsg = Encoding.UTF8.GetBytes(message);
// channelmodel.BasicPublish(exchange: exchangeName,routingKey: routerKey,basicProperties: null,body: bytemsg);
}
}
}
//订阅者 Exchange Router路由 Director
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
static void Main(string[] args)
{
Console.WriteLine("开始消费中。。!");
if (args.Length > 0)
{
string routerKeyValue = args[0].Split("=")[1];
Console.WriteLine($"routerKey=>{routerKeyValue}");
if (!string.IsNullOrEmpty(routerKeyValue))ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(routerKey: routerKeyValue, handler: msg =>
{
Console.WriteLine($"拿到消息:{msg}");
});
else
Console.WriteLine("没有获取到routerKey !");
}
//else
//{
// ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(handler: msg =>
// {
// Console.WriteLine($"拿到消息:{msg}");
// });
//}
Console.ReadLine();
}
}
}

//订阅者 Exchange Router路由 Director 后端逻辑
public static void ExchangeRoutersByDirectModelConsumer(string exchangeName = "qqai", string routerKey = "insertToStudent", Action<string> handler = null)
{
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routerKey);
Console.WriteLine("wating for message...!");
var consumer = new EventingBasicConsumer(channel);
//(object sender, BasicDeliverEventArgs e)
consumer.Received += (sender, e) =>
{
var bytedata = e.Body.ToArray();
var getRoutekey = e.RoutingKey;
string msg = Encoding.UTF8.GetString(bytedata);
if (handler != null)
handler.Invoke(msg);
else
Console.WriteLine($"路由{getRoutekey},订阅到消息{msg}!");
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer);
}



- EOF -


推荐阅读  点击标题可跳转
IdentityServer4 3.1.x 迁移到 4.x.NET Core+MongoDB集群搭建与实战.NET 5.0 RC2 发布,正式版将在11月发布


看完本文有收获?请转发分享给更多人

关注「DotNet」加星标,提升.Net技能 

好文章,我在看❤️

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存